-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-16913 - Support external schemas in JSONConverter #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
Contributes to: event-integration/eventstreams-planning/12766 KIP Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverte Signed-off-by: Priyanka K U <[email protected]>
Contributes to: event-integration/eventstreams-planning/12766 KIP Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverte Signed-off-by: Priyanka K U <[email protected]>
Contributes to: event-integration/eventstreams-planning/12766 KIP Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverte Signed-off-by: Priyanka K U <[email protected]>
I have added this Test case, thanks for suggesting. |
Hi, @FrankYang0529 |
WalkthroughThis update introduces a new configuration property, Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant JsonConverter
participant JsonConverterConfig
User->>JsonConverter: configure(props)
JsonConverter->>JsonConverterConfig: new JsonConverterConfig(props)
JsonConverterConfig-->>JsonConverter: schemaContent()
JsonConverter->>JsonConverter: Parse schemaContent, set schema field
User->>JsonConverter: toConnectData(topic, value)
alt schema is set
JsonConverter->>JsonConverter: Use configured schema to convert value
else schema not set
JsonConverter->>JsonConverter: Use envelope logic (expect schema+payload in value)
end
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
@coderabbitai full review |
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java (1)
40-43
: Fix typo in schema.content documentationThe documentation string contains a typo: "...the schema will should be in...". Remove the word "will" or "should" for clarity.
- private static final String SCHEMA_CONTENT_DOC = "When set, this is used as the schema for all messages. Otherwise, the schema will should be in the contents of each message."; + private static final String SCHEMA_CONTENT_DOC = "When set, this is used as the schema for all messages. Otherwise, the schema should be in the contents of each message.";connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java (1)
363-369
: Be more specific in the error message about external schemasWhile the implementation is correct, the error message could be improved to inform users about the new
schema.content
option.if (config.schemasEnabled()) { if (this.schema != null) { return new SchemaAndValue(schema, convertToConnect(schema, jsonValue, config)); } else if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) { - throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." + - " If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration."); + throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." + + " If you are trying to deserialize plain JSON data, either set schemas.enable=false or provide a schema via schema.content in your converter configuration."); } } else {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
(3 hunks)connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
(5 hunks)connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
(2 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java (1)
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java (1)
JsonConverterConfig
(32-156)
🔇 Additional comments (11)
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java (3)
96-96
: LGTM! Well-structured field addition for external schema storageThe private field for storing schema content as a byte array is properly aligned with the other cached config fields.
104-105
: LGTM! Efficient handling of schema contentConverting the schema content string to UTF-8 bytes in the constructor is efficient as it avoids repeated conversions during operation.
143-153
: LGTM! Clear and well-documented accessor methodThe documentation clearly explains when the schema content will be used and its relationship to the schemas.enable setting.
connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java (2)
71-73
: LGTM! Clearly documented new field for external schema storageThe comment explains the purpose of the new field well, indicating that it will be used for all messages when provided.
299-308
: LGTM! Robust schema loading implementationThe implementation correctly handles schema parsing by:
- Only attempting to parse non-empty schema content
- Using appropriate exception handling with clear error messages
- Wrapping the serialization exception in a DataException for consistent error handling
connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java (6)
983-987
: LGTM! Good test for null schema content configurationThis test verifies that when schema.content is set to null, the converter behaves as it did before, requiring the JSON to contain schema and payload fields.
989-993
: LGTM! Good test for empty schema content configurationThis test verifies that when schema.content is an empty string, the converter still requires the JSON to contain schema and payload fields.
995-999
: LGTM! Good test for valid schema content configurationThis test verifies that when schema.content is a valid schema, the converter directly uses that schema to deserialize the JSON payload without requiring the envelope structure.
1001-1007
: LGTM! Good test for invalid schema content configurationThis test verifies that an exception is thrown when an invalid schema is provided via configuration.
1009-1014
: LGTM! Great edge case testing for complex schema structuresThis test handles the interesting case where the provided schema itself contains fields named "schema" and "payload", which could potentially conflict with the envelope structure. The test verifies that the converter correctly uses the provided schema rather than trying to interpret the input as an envelope.
1016-1029
: LGTM! Comprehensive error case testing with parameterized testsThe parameterized test is an excellent approach to verify that appropriate exceptions are thrown for various malformed JSON inputs when no schema content is provided.
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new parameter, schema.content
, to the JSONConverter, allowing users to provide the schema externally, reducing message size and facilitating the use of more complex schemas. The changes look good overall, and the tests cover the new functionality well. Here are a few suggestions for improvement.
Summary of Findings
- Schema Content Validation: The code includes validation for the schema content provided in the configuration. It would be beneficial to provide more specific error messages to help users diagnose issues with their schema definitions.
- Schema Usage: The code checks if
this.schema
is not null before using it. Consider adding a check to ensure thatconfig.schemasEnabled()
is also true whenthis.schema
is not null to prevent unexpected behavior. - Test Coverage: The tests cover various scenarios, including null, empty, valid, and invalid schema content. Consider adding a test case to verify that the converter correctly handles a schema with a default value when
replace.null.with.default
is enabled.
Merge Readiness
The pull request is almost ready for merging. I recommend addressing the comments related to schema content validation and schema usage to improve the user experience and prevent potential issues. I am unable to approve the pull request, and users should have others review and approve this code before merging. Once these issues are addressed, the pull request should be in good shape to be merged.
converter.configure(Map.of(), false); | ||
assertThrows( | ||
DataException.class, | ||
() -> converter.toConnectData(TOPIC, value.getBytes())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using a connector that requires a schema, such as JDBC connectors, with JSON messages, the current JSONConverter necessitates including the schema within every message. To address this, we are introducing a new parameter, schema.content, which allows you to provide the schema externally. This approach not only reduces the size of the messages but also facilitates the use of more complex schemas.
KIP : https://cwiki.apache.org/confluence/display/KAFKA/KIP-1054%3A+Support+external+schemas+in+JSONConverter
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)
Summary by CodeRabbit
New Features
Bug Fixes
Tests